#!/usr/bin/env bash
# db-unload -- Unloads a SQL query to a set of sinks
# > db-unload QUERY FORMAT SINK...
#
# GPFDIST_DISABLE=1 will fallback to PostgreSQL driver's unloading with
# parallel connections, which MAY RESULT IN INCORRECT RESULT UNLESS QUERY IS
# TOTALLY ORDERED BY COLUMNS WITH NO DUPLICATE VALUES!
#
# GPFDIST_PORT_BASE is the base port number for the gpfdist processes.
# (defaults to a random port in range 10000-59900)
# GPFDIST_MAX_LENGTH (defaults to 1MiB) may need to be increased for larger
# records to be unloaded through gpfdist.
# GPFDIST_LOG_PATH can be set to a path prefix for logging.
##
set -eu

: ${GPFDIST_DISABLE:=} ${GPFDIST_MAX_LENGTH:=$((2**20))} ${GPFDIST_LOG_PATH:=}
: ${GPFDIST_PORT_BASE:=$(( 100*($RANDOM % 500 + 100) ))} # randomized default base ports

[[ $# -gt 0 ]] || usage "$0" "Missing QUERY"
query=$1; shift
[[ $# -gt 0 ]] || usage "$0" "Missing FORMAT"
format=$1; shift
[[ $# -gt 0 ]] || usage "$0" "Missing SINK"

STEP() { echo >&2 "$@"; }

case $format in
    tsj|tsv|csv)
        nsinks=$#
        if [[ $nsinks -gt 1 ]]; then
            if [[ -n $GPFDIST_DISABLE ]]; then
                # fallback to PostgreSQL's driver if gpfdist is disabled
                # XXX usually results in incorrect output
                exec "$DEEPDIVE_HOME"/util/db-driver/postgresql/db-unload "$query" "$format" "$@"
                # FIXME Can we find a general way to use the PG driver's
                # parallel connection approach for GP, which seems to be
                # significantly faster than using gpfdist?
                # E.g., instead of range partitioning the row range which
                # cannot be used for partitioning the rows exactly, maybe
                # we can partition the domain of gp_segment_id and assign a few
                # to each connections, assuming the #conn << #segments.
            fi

            tmpTable="dd_unload_$(sha1sum <<<"$query" | cut -b1-40)"
            tmpTableType="${tmpTable}_type"
            tmpDir=$(mktemp -d "$PWD/greenplum.db-unload.XXXXXXX")
            cleanup() {
                # terminate all gpfdist processes that were started
                kill -TERM $(cat "$tmpDir"/gpfdist-*.pid)
                # make sure all sinks are at least once opened and closed
                # e.g., when only a few gpfdist recives records, some named
                # pipe sinks may not get any activity, hanging the other end
                for sink in "$tmpDir"/sink-*; do timeout 1s tee -a "$sink" </dev/null & done
                wait || true
                rm -rf "$tmpDir"
            }
            trap cleanup EXIT

            gpfdist --help &>/dev/null || error 'gpfdist not working! make it work!'
            STEP "starting $nsinks gpfdist processes"
            # spawn gpfdist processes
            locations=()
            port=$GPFDIST_PORT_BASE
            i=1
            for sink; do
                sinkHost=$HOSTNAME
                # TODO support sinks on remote hosts, potentially created by
                # non-local compute-drivers, by parsing sinkHost from sink, and
                # spawning gpfdist remotely via ssh or so
                sinkDir=$(dirname "$sink") sinkName=$(basename "$sink")
                mkdir -p "$sinkDir"
                spawn_gpfdist() {
                    # skip over ports already taken
                    while nc -z "$sinkHost" "$port"; do let ++port; done
                    gpfdist -p $port \
                        ${GPFDIST_MAX_LENGTH:+-m $GPFDIST_MAX_LENGTH} \
                        ${GPFDIST_LOG_PATH:+-l ${GPFDIST_LOG_PATH}.$i.log} &>/dev/null \
                        "$@" &
                    echo $! >>"$tmpDir"/gpfdist-$i.pid
                    # TODO cope with gpfdist failure, e.g., when another process suddenly tried to listen to the picked port number
                }
                if ${DEEPDIVE_SHOW_PROGRESS:-true}; then
                    spawn_gpfdist -d "$tmpDir"
                    rm -f "$tmpDir"/sink-$i; mkfifo "$tmpDir"/sink-$i
                    show_progress output_from "unloading [$i/$nsinks]" -- \
                        cat "$tmpDir"/sink-$i >"$sink" &
                    locations+=("gpfdist://$sinkHost:$port/sink-$i")
                else
                    spawn_gpfdist -d "$sinkDir"
                    locations+=("gpfdist://$sinkHost:$port/$sinkName")
                    # keep a symlink with consistent name for easier cleanup
                    ln -sfn "$(cd "$sinkDir" && pwd)/$sinkName" "$tmpDir"/sink-$i
                fi
                let ++i ++port
            done

            locationsSql=$(printf "'%s', " "${locations[@]//"'"/"''"}")
            locationsSql="(${locationsSql%, })"
            case $format in
                tsv) formatSql=TEXT ;;
                csv) formatSql=CSV  ;;
            esac

            STEP "unloading to $nsinks locations in parallel: $(escape4sh "$query")"
            db-execute "
                DROP TABLE IF EXISTS $tmpTableType;
                CREATE TEMP TABLE $tmpTableType AS SELECT * FROM ($query) R LIMIT 0;
                DROP TABLE IF EXISTS $tmpTable;
                CREATE WRITABLE EXTERNAL TABLE $tmpTable (LIKE $tmpTableType) LOCATION $locationsSql FORMAT '$formatSql';
                INSERT INTO $tmpTable $query;
                DROP EXTERNAL TABLE $tmpTable;
            "
        else
            sink=$1; shift
            STEP "unloading to $sink: $(escape4sh "$query")"
            exec show_progress output_from "unloading" -- \
                db-query "$query" "$format" 0 >"$sink"
        fi
        ;;
    *) error "$format: unsupported format by Greenplum driver" ;;
esac
